Kafka Connect: Surface commit failures instead of silently swallowing them#16237
Kafka Connect: Surface commit failures instead of silently swallowing them#16237yadavay-amzn wants to merge 1 commit into
Conversation
Baunsgaard
left a comment
There was a problem hiding this comment.
Good catch and cleanup.
However, the error logging strategy you are proposing seems to be double-logging every commit failure in CoordinatorThread.run(). I have left some specific suggestions.
| LOG.error( | ||
| "Coordinator {} failed to commit for commit {}; propagating failure to terminate task", | ||
| taskId, | ||
| commitState.currentCommitId(), | ||
| e); | ||
| throw e; |
There was a problem hiding this comment.
change it to
throw new RuntimeException(
String.format("Coordinator %s failed to commit %s",
taskId, commitState.currentCommitId()),
e);
This allows the further up CoordinatorThread.run() catch to log the error once, and still attribute the error to this location.
There was a problem hiding this comment.
Done, updated in latest revision
| ImmutableList.of(), | ||
| EventTestUtil.now())) | ||
| .isInstanceOf(CommitFailedException.class) | ||
| .hasMessageContaining("Glue detected concurrent update"); |
There was a problem hiding this comment.
if you do the above change, then i think this need to be :
.hasRootCauseMessage("Glue detected concurrent update");
dd4620e to
8467e0d
Compare
|
Thanks @Baunsgaard for taking a look, you're right about the double-logging. |
Baunsgaard
left a comment
There was a problem hiding this comment.
LGTM, left one nit for production code. Tests looks fine!
| // Do not swallow commit failures: wrap with Coordinator context and propagate so | ||
| // CoordinatorThread.run() terminates and the Kafka Connect task transitions to FAILED | ||
| // instead of silently dropping data (e.g., CommitFailedException from catalogs that | ||
| // detect concurrent updates). The taskId and commitId are embedded in the wrapper | ||
| // message so that the single log emitted by CoordinatorThread retains the context. |
There was a problem hiding this comment.
nit: I think it is too much to leave this comment. It is a personal preference, but I would remove it.
… them The Coordinator previously caught all exceptions from doCommit() and only logged a warning, causing the connector to stay RUNNING after a CommitFailedException (e.g., Glue concurrent update) while silently dropping data. Propagate the exception so CoordinatorThread terminates and the Kafka Connect task transitions to FAILED. Fixes apache#15878
8467e0d to
97cdeb1
Compare
|
Done — removed the comment block. |
Fixes #15878.
Problem
The Kafka Connect
Coordinatorpreviously caughtExceptionarounddoCommit()and only logged a warning, so when a commit failed (e.g., aCommitFailedExceptionfrom Glue detecting a concurrent table update), the connector stayedRUNNINGwhile silently dropping the data that was in flight.Fix
Remove the catch-all around
doCommit()and instead log atERRORlevel with the task id and commit id before rethrowing.CoordinatorThread.run()already terminates the thread on uncaught exceptions, which transitions the Kafka Connect task toFAILED— so failures are now surfaced rather than dropped.The
finallyblock that callscommitState.endCurrentCommit()is preserved so per-commit state is cleaned up regardless of the outcome.Testing
testCommitFailedExceptionPropagateswhich mocks a catalog-sideCommitFailedExceptiononAppendFiles.commit()and asserts it propagates out ofCoordinator.process(). Without the fix, this test fails because the exception is swallowed.testCoordinatorWithBadDataFileandtestCoordinatorCommittedOffsetValidation) that previously relied on silent-swallow behaviour; they now assert the specific exception propagates (IllegalArgumentExceptionfor bad partition spec,ValidationExceptionfor stale offsets).TestCoordinatorsuite passes locally (8/8).spotlessApplypasses.